home *** CD-ROM | disk | FTP | other *** search
/ Delphi Magazine Collection 2001 / Delphi Magazine Collection 20001 (2001).iso / DISKS / Issue56 / Alfresco / AAThdCpy.pas < prev    next >
Encoding:
Pascal/Delphi Source File  |  2000-03-01  |  7.0 KB  |  257 lines

  1. {*********************************************************}
  2. {* AAThdCpy                                              *}
  3. {* Copyright (c) Julian M Bucknall 1998-2000             *}
  4. {* All rights reserved.                                  *}
  5. {*********************************************************}
  6. {* Algorithms Alfresco: multithreaded multibuffered copy *}
  7. {*********************************************************}
  8.  
  9. {Note: this unit is released as freeware. In other words, you are free
  10.        to use this unit in your own applications, however I retain all
  11.        copyright to the code. JMB}
  12.  
  13. unit AAThdCpy;
  14.  
  15. interface
  16.  
  17. uses
  18.   SysUtils, Windows, Classes;
  19.  
  20. procedure AAThreadedCopyStream(aSrcStream, aDestStream : TStream);
  21.  
  22. implementation
  23.  
  24. const
  25.   BufferSize = 1024;
  26.  
  27. type
  28.   PBuffer = ^TBuffer;
  29.   TBuffer = packed record
  30.     bCount : longint;
  31.     bBlock : array [0..pred(BufferSize)] of byte;
  32.   end;
  33.  
  34.   PBufferArray = ^TBufferArray;
  35.   TBufferArray = array [0..1023] of PBuffer;
  36.  
  37.   TQueuedBuffers = class
  38.     private
  39.       FBufCount   : integer;
  40.       FBuffers    : PBufferArray;
  41.       FHead       : integer;
  42.       FIsNotEmpty : THandle;
  43.       FIsNotFull  : THandle;
  44.       FTail       : integer;
  45.     protected
  46.       function qbGetHead : PBuffer;
  47.       function qbGetTail : PBuffer;
  48.     public
  49.       constructor Create(aBufferCount : integer);
  50.       destructor Destroy; override;
  51.  
  52.       procedure AdvanceHead;
  53.       procedure AdvanceTail;
  54.  
  55.       property Head : PBuffer read qbGetHead;
  56.       property Tail : PBuffer read qbGetTail;
  57.  
  58.       property IsNotEmpty : THandle read FIsNotEmpty;
  59.       property IsNotFull : THandle read FIsNotFull;
  60.   end;
  61.  
  62. type
  63.   TProducer = class(TThread)
  64.     private
  65.       FStream  : TStream;
  66.       FBuffers : TQueuedBuffers;
  67.     protected
  68.       procedure Execute; override;
  69.     public
  70.       constructor Create(aStream  : TStream;
  71.                          aBuffers : TQueuedBuffers);
  72.       destructor Destroy; override;
  73.   end;
  74.  
  75. type
  76.   TConsumer = class(TThread)
  77.     private
  78.       FStream  : TStream;
  79.       FBuffers : TQueuedBuffers;
  80.     protected
  81.       procedure Execute; override;
  82.     public
  83.       constructor Create(aStream  : TStream;
  84.                          aBuffers : TQueuedBuffers);
  85.       destructor Destroy; override;
  86.   end;
  87.  
  88.  
  89. {===TQueuedBuffers===================================================}
  90. constructor TQueuedBuffers.Create(aBufferCount : integer);
  91. var
  92.   i : integer;
  93. begin
  94.   inherited Create;
  95.   {allocate the buffers}
  96.   FBuffers := AllocMem(aBufferCount * sizeof(pointer));
  97.   for i := 0 to pred(aBufferCount) do
  98.     GetMem(FBuffers^[i], sizeof(TBuffer));
  99.   FBufCount := aBufferCount;
  100.   {create the semaphores}
  101.   FIsNotFull := CreateSemaphore(nil, aBufferCount, aBufferCount, '');
  102.   FIsNotEmpty := CreateSemaphore(nil, 0, aBufferCount, '');
  103. end;
  104. {--------}
  105. destructor TQueuedBuffers.Destroy;
  106. var
  107.   i : integer;
  108. begin
  109.   {destroy the semaphores}
  110.   if (FIsNotFull <> 0) then
  111.     CloseHandle(FIsNotFull);
  112.   if (FIsNotEmpty <> 0) then
  113.     CloseHandle(FIsNotEmpty);
  114.   {free the buffers}
  115.   if (FBuffers <> nil) then begin
  116.     for i := 0 to pred(FBufCount) do
  117.       if (FBuffers^[i] <> nil) then
  118.         FreeMem(FBuffers^[i], sizeof(TBuffer));
  119.     FreeMem(FBuffers, FBufCount * sizeof(pointer));
  120.   end;
  121.   inherited Destroy;
  122. end;
  123. {--------}
  124. procedure TQueuedBuffers.AdvanceHead;
  125. begin
  126.   inc(FHead);
  127.   if (FHead = FBufCount) then
  128.     FHead := 0;
  129. end;
  130. {--------}
  131. procedure TQueuedBuffers.AdvanceTail;
  132. begin
  133.   inc(FTail);
  134.   if (FTail = FBufCount) then
  135.     FTail := 0;
  136. end;
  137. {--------}
  138. function TQueuedBuffers.qbGetHead : PBuffer;
  139. begin
  140.   Result := FBuffers^[FHead];
  141. end;
  142. {--------}
  143. function TQueuedBuffers.qbGetTail : PBuffer;
  144. begin
  145.   Result := FBuffers^[FTail];
  146. end;
  147. {====================================================================}
  148.  
  149.  
  150. {===TProducer========================================================}
  151. constructor TProducer.Create(aStream  : TStream;
  152.                              aBuffers : TQueuedBuffers);
  153. begin
  154.   inherited Create(true);
  155.   FStream := aStream;
  156.   FBuffers := aBuffers;
  157. end;
  158. {--------}
  159. destructor TProducer.Destroy;
  160. begin
  161.   inherited Destroy;
  162. end;
  163. {--------}
  164. procedure TProducer.Execute;
  165. var
  166.   Tail : PBuffer;
  167. begin
  168.   {do until the stream is exhausted...}
  169.   repeat
  170.     {get the 'queue is not full' semaphore}
  171.     WaitForSingleObject(FBuffers.IsNotFull, INFINITE);
  172.     {read a block from the stream into the tail buffer}
  173.     Tail := FBuffers.Tail;
  174.     Tail^.bCount := FStream.Read(Tail^.bBlock, BufferSize);
  175.     {advance the tail pointer}
  176.     FBuffers.AdvanceTail;
  177.     {as we've written a new buffer, signal the 'queue is not empty'
  178.      semaphore}
  179.     ReleaseSemaphore(FBuffers.IsNotEmpty, 1, nil);
  180.   until (Tail^.bCount = 0);
  181. end;
  182. {====================================================================}
  183.  
  184.  
  185. {===TConsumer========================================================}
  186. constructor TConsumer.Create(aStream  : TStream;
  187.                              aBuffers : TQueuedBuffers);
  188. begin
  189.   inherited Create(true);
  190.   FStream := aStream;
  191.   FBuffers := aBuffers;
  192. end;
  193. {--------}
  194. destructor TConsumer.Destroy;
  195. begin
  196.   inherited Destroy;
  197. end;
  198. {--------}
  199. procedure TConsumer.Execute;
  200. var
  201.   Head : PBuffer;
  202. begin
  203.   {get the 'queue is not empty' semaphore}
  204.   WaitForSingleObject(FBuffers.IsNotEmpty, INFINITE);
  205.   {get the head buffer}
  206.   Head := FBuffers.Head;
  207.   {while the head buffer is not empty...}
  208.   while (Head^.bCount <> 0) do begin
  209.     {write a block from the head buffer into the stream}
  210.     FStream.Write(Head^.bBlock, Head^.bCount);
  211.     {advance the head pointer}
  212.     FBuffers.AdvanceHead;
  213.     {as we've read a buffer, signal the 'queue is not full' semaphore}
  214.     ReleaseSemaphore(FBuffers.IsNotFull, 1, nil);
  215.     {get the 'queue is not empty' semaphore}
  216.     WaitForSingleObject(FBuffers.IsNotEmpty, INFINITE);
  217.     {get the head buffer}
  218.     Head := FBuffers.Head;
  219.   end;
  220. end;
  221. {====================================================================}
  222.  
  223.  
  224. {===Interfaced routine===============================================}
  225. procedure AAThreadedcopyStream(aSrcStream, aDestStream : TStream);
  226. var
  227.   Buffers  : TQueuedBuffers;
  228.   Producer : TProducer;
  229.   Consumer : TConsumer;
  230.   WaitArray : array [0..1] of THandle;
  231. begin
  232.   Buffers := nil;
  233.   Producer := nil;
  234.   Consumer := nil;
  235.   try
  236.     {create the queued buffer object (20 buffers) and the two threads}
  237.     Buffers := TQueuedBuffers.Create(20);
  238.     Producer := TProducer.Create(aSrcStream, Buffers);
  239.     Consumer := TConsumer.Create(aDestStream, Buffers);
  240.     {save the thread handles so we can wait on them}
  241.     WaitArray[0] := Producer.Handle;
  242.     WaitArray[1] := Consumer.Handle;
  243.     {start the threads up}
  244.     Consumer.Resume;
  245.     Producer.Resume;
  246.     {wait for the threads to finish}
  247.     WaitForMultipleObjects(2, @WaitArray, true, INFINITE);
  248.   finally
  249.     Producer.Free;
  250.     Consumer.Free;
  251.     Buffers.Free;
  252.   end;
  253. end;
  254. {====================================================================}
  255.  
  256. end.
  257.